/*
 * Scala (https://www.scala-lang.org)
 *
 * Copyright EPFL and Lightbend, Inc. dba Akka
 *
 * Licensed under Apache License 2.0
 * (http://www.apache.org/licenses/LICENSE-2.0).
 *
 * See the NOTICE file distributed with this work for
 * additional information regarding copyright ownership.
 */

package scala.async

import java.util.Objects

import scala.util.{Failure, Success, Try}
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.control.NonFatal

/** The base class for state machines generated by the `scala.async.Async.async` macro.
 *  Not intended to be directly extended in user-written code.
 */
abstract class FutureStateMachine(execContext: ExecutionContext) extends Function1[Try[AnyRef], Unit] {
  Objects.requireNonNull(execContext)

  type F = scala.concurrent.Future[AnyRef]
  type R = scala.util.Try[AnyRef]

  private[this] val result$async: Promise[AnyRef] = Promise[AnyRef]();
  private[this] var state$async: Int = 0

  /** Retrieve the current value of the state variable */
  protected def state: Int = state$async

  /** Assign `i` to the state variable */
  protected def state_=(s: Int): Unit = state$async = s

  NonFatal // eagerly classloading NonFatal to reduce the chance of a cascading StackOverflowError in `completeFailure`

  /** Complete the state machine with the given failure. */
  protected def completeFailure(t: Throwable): Unit = t match {
    case NonFatal(t) =>
      result$async.complete(Failure(t))
    case _ =>
      throw t
  }

  /** Complete the state machine with the given value. */
  protected def completeSuccess(value: AnyRef): Unit = {
    result$async.complete(Success(value))
  }

  /** Register the state machine as a completion callback of the given future. */
  protected def onComplete(f: F): Unit = {
    f.onComplete(this)(execContext)
  }

  /** Extract the result of the given future if it is complete, or `null` if it is incomplete. */
  protected def getCompleted(f: F): Try[AnyRef] = {
    if (f.isCompleted) {
      f.value.get
    } else null
  }

  /**
   * Extract the success value of the given future. If the state machine detects a failure it may
   * complete the async block and return `this` as a sentinel value to indicate that the caller
   * (the state machine dispatch loop) should immediately exit.
   */
  protected def tryGet(tr: R): AnyRef = tr match {
    case Success(value) =>
      value.asInstanceOf[AnyRef]
    case Failure(throwable) =>
      completeFailure(throwable)
      this // sentinel value to indicate the dispatch loop should exit.
  }

  def start[T](): Future[T] = {
    // This cast is safe because we know that `def apply` does not consult its argument when `state == 0`.
    Future.unit.asInstanceOf[Future[AnyRef]].onComplete(this)(execContext)
    result$async.future.asInstanceOf[Future[T]]
  }
}
